/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.hdds.scm.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;

/**
 * A stream for accessing multipart streams.
 */
public class MultipartInputStream extends ExtendedInputStream {

  private final String key;
  private final long length;

  // List of PartInputStream, one for each part of the key
  private final List<? extends PartInputStream> partStreams;

  // partOffsets[i] stores the index of the first data byte in
  // partStream w.r.t the whole key data.
  // For example, let’s say the part size is 200 bytes and part[0] stores
  // data from indices 0 - 199, part[1] from indices 200 - 399 and so on.
  // Then, partOffsets[0] = 0 (the offset of the first byte of data in
  // part[0]), partOffsets[1] = 200 and so on.
  private final long[] partOffsets;

  private boolean closed;
  // Index of the partStream corresponding to the current position of the
  // MultipartCryptoKeyInputStream.
  private int partIndex;

  // Tracks the partIndex corresponding to the last seeked position so that it
  // can be reset if a new position is seeked.
  private int prevPartIndex;

  public MultipartInputStream(String keyName,
                              List<? extends PartInputStream> inputStreams) {

    Preconditions.checkNotNull(inputStreams);

    this.key = keyName;
    this.partStreams = inputStreams;

    // Calculate and update the partOffsets
    this.partOffsets = new long[inputStreams.size()];
    int i = 0;
    long streamLength = 0L;
    for (PartInputStream partInputStream : inputStreams) {
      this.partOffsets[i++] = streamLength;
      streamLength += partInputStream.getLength();
    }
    this.length = streamLength;
  }

  @Override
  protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
      throws IOException {
    Preconditions.checkArgument(strategy != null);
    checkOpen();

    int totalReadLen = 0;
    while (strategy.getTargetLength() > 0) {
      if (partStreams.size() == 0 ||
          partStreams.size() - 1 <= partIndex &&
              partStreams.get(partIndex).getRemaining() == 0) {
        return totalReadLen == 0 ? EOF : totalReadLen;
      }

      // Get the current partStream and read data from it
      PartInputStream current = partStreams.get(partIndex);
      int numBytesToRead = getNumBytesToRead(strategy, current);
      int numBytesRead = strategy
          .readFromBlock((InputStream) current, numBytesToRead);
      checkPartBytesRead(numBytesToRead, numBytesRead, current);
      totalReadLen += numBytesRead;

      if (current.getRemaining() <= 0 &&
          partIndex + 1 < partStreams.size()) {
        partIndex += 1;
      }
    }
    return totalReadLen;
  }

  protected int getNumBytesToRead(ByteReaderStrategy strategy,
                                  PartInputStream current) throws IOException {
    return strategy.getTargetLength();
  }

  protected void checkPartBytesRead(int numBytesToRead, int numBytesRead,
                                    PartInputStream stream) throws IOException {
  }

  /**
   * Seeks the InputStream to the specified position. This involves 2 steps:
   * 1. Updating the partIndex to the partStream corresponding to the
   * seeked position.
   * 2. Seeking the corresponding partStream to the adjusted position.
   * <p>
   * For example, let’s say the part sizes are 200 bytes and part[0] stores
   * data from indices 0 - 199, part[1] from indices 200 - 399 and so on.
   * Let’s say we seek to position 240. In the first step, the partIndex
   * would be updated to 1 as indices 200 - 399 reside in partStream[1]. In
   * the second step, the partStream[1] would be seeked to position 40 (=
   * 240 - blockOffset[1] (= 200)).
   */
  @Override
  public synchronized void seek(long pos) throws IOException {
    checkOpen();
    if (pos == 0 && length == 0) {
      // It is possible for length and pos to be zero in which case
      // seek should return instead of throwing exception
      return;
    }
    if (pos < 0 || pos > length) {
      throw new EOFException(
          "EOF encountered at pos: " + pos + " for key: " + key);
    }

    // 1. Update the partIndex
    if (partIndex >= partStreams.size()) {
      partIndex = Arrays.binarySearch(partOffsets, pos);
    } else if (pos < partOffsets[partIndex]) {
      partIndex =
          Arrays.binarySearch(partOffsets, 0, partIndex, pos);
    } else if (pos >= partOffsets[partIndex] + partStreams
        .get(partIndex).getLength()) {
      partIndex = Arrays.binarySearch(partOffsets, partIndex + 1,
          partStreams.size(), pos);
    }
    if (partIndex < 0) {
      // Binary search returns -insertionPoint - 1  if element is not present
      // in the array. insertionPoint is the point at which element would be
      // inserted in the sorted array. We need to adjust the blockIndex
      // accordingly so that partIndex = insertionPoint - 1
      partIndex = -partIndex - 2;
    }

    // Reset the previous partStream's position
    partStreams.get(prevPartIndex).seek(0);

    // Reset all the partStreams above the partIndex. We do this to reset
    // any previous reads which might have updated the higher part
    // streams position.
    for (int index = partIndex + 1; index < partStreams.size(); index++) {
      partStreams.get(index).seek(0);
    }
    // 2. Seek the partStream to the adjusted position
    partStreams.get(partIndex).seek(pos - partOffsets[partIndex]);
    prevPartIndex = partIndex;
  }

  @Override
  public synchronized long getPos() throws IOException {
    return length == 0 ? 0 :
        partOffsets[partIndex] + partStreams.get(partIndex).getPos();
  }

  @Override
  public synchronized int available() throws IOException {
    checkOpen();
    long remaining = length - getPos();
    return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE;
  }

  @Override
  public synchronized void unbuffer() {
    for (PartInputStream stream : partStreams) {
      stream.unbuffer();
    }
  }

  @Override
  public synchronized long skip(long n) throws IOException {
    if (n <= 0) {
      return 0;
    }

    long toSkip = Math.min(n, length - getPos());
    seek(getPos() + toSkip);
    return toSkip;
  }

  @Override
  public synchronized void close() throws IOException {
    closed = true;
    for (PartInputStream stream : partStreams) {
      stream.close();
    }
  }

  /**
   * Verify that the input stream is open. Non blocking; this gives
   * the last state of the volatile {@link #closed} field.
   *
   * @throws IOException if the connection is closed.
   */
  private void checkOpen() throws IOException {
    if (closed) {
      throw new IOException(
          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + key);
    }
  }

  public long getLength() {
    return length;
  }

  @VisibleForTesting
  public synchronized int getCurrentStreamIndex() {
    return partIndex;
  }

  @VisibleForTesting
  public long getRemainingOfIndex(int index) throws IOException {
    return partStreams.get(index).getRemaining();
  }

  @VisibleForTesting
  public List<? extends PartInputStream> getPartStreams() {
    return partStreams;
  }
}
